package cn.xshi.iot.live.worker;

import cn.xshi.iot.live.util.CacheUtil;
import cn.xshi.iot.live.util.PushUtil;
import cn.xshi.iot.live.model.CameraEntity;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @Desc 执行线程（推送器）
 * @Author 邓纯杰
 * @CreateTime 2012-12-12 12:12:12
 */
@Slf4j
public class FlexPusher {

	private final static Logger logger = LoggerFactory.getLogger(FlexPusher.class);

	public static class MyRunnable implements Runnable {
		// 创建线程池
		public static ExecutorService es = Executors.newCachedThreadPool();
		private CameraEntity cameraEntity;
		private Thread nowThread;
		public MyRunnable(CameraEntity cameraEntity) {
			this.cameraEntity = cameraEntity;
		}
		// 中断线程
		public void setInterrupted() {
			nowThread.interrupt();
		}
		// 中断线程
		public void setInterrupted(CameraEntity cameraEntity) {
			this.cameraEntity = cameraEntity;
//			nowThread.interrupt();
		}
		@Override
		public void run() {
			try {
				if(cameraEntity.getErrorCount()<=100){
					// 获取当前线程存入缓存
					nowThread = Thread.currentThread();
					cameraEntity.setOpenTime(new Date());//记录上次播放时间
//					log.info("进入拉流时设置播放时间："+cameraEntity.getOpenTime());
//					CacheUtil.STREAMMAP.put(cameraEntity.getToken(), cameraEntity);
					// 执行转流推流任务
					PushUtil push = new PushUtil(cameraEntity).from();
					if (push != null) {
						cameraEntity.setOpenTime(new Date());//记录上次播放时间
						log.info("成功转封装设置播放时间："+cameraEntity.getOpenTime());
						CacheUtil.STREAMMAP.put(cameraEntity.getToken(), cameraEntity);
						push.to().go(nowThread);
					}
				}
				// 清除缓存
				log.info("清理缓存对象开始...");
				CacheUtil.STREAMMAP.remove(cameraEntity.getToken());
				CacheUtil.jobMap.remove(cameraEntity.getToken());
				log.info("清理缓存对象完毕...");
			} catch (Exception e) {
				logger.error("当前任务： " + cameraEntity.getRtsp() + "停止...");
				CacheUtil.STREAMMAP.remove(cameraEntity.getToken());
				CacheUtil.jobMap.remove(cameraEntity.getToken());
				cameraEntity.setErrorCount(cameraEntity.getErrorCount()+1);
			}finally {
				logger.info("当前任务终止，名称："+nowThread.getName()+"，ID："+nowThread.getId()+"，ErrorCount："+cameraEntity.getErrorCount()+"，当前设备编号："+cameraEntity.getErrorCount());
				logger.info("重新加入保活线程中开始.....");
				if(cameraEntity.getErrorCount()<=100){
					if(null != cameraEntity.getStartTime()){//回放流
						//处理中断后重推的开始时间
						long times = calculateTimes(cameraEntity.getOpenTime(),new Date());
						log.info("已经播放持续时间："+times+"毫秒，上次播放时间："+cameraEntity.getOpenTime());
						cameraEntity.setOpenTime(new Date());//记录本次播放时间（用于下一次计算）
						long newStartTimes = cameraEntity.getStartTime().getTime()+times;
						Date newStart = new Date(newStartTimes);
						if(compareDate(newStart,cameraEntity.getEndTime())){//如果开始时间大于等于结束时间 回放结束
							log.info("回放结束！");
						}else{
							cameraEntity.setStartTime(newStart);//重新设置开始时间（上一次播放记录）
							KeepAliveWorker.put(cameraEntity);//进行保活
							logger.info("重新加入保活线程中完毕.....");
						}
					}else{//直播流
						KeepAliveWorker.put(cameraEntity);//进行 保活
						logger.info("重新加入保活线程中完毕.....");
					}
				}else{
					log.info("终止连接，原因始终无法连接成功，该设备无效，ID号："+cameraEntity.getStream());
				}
			}
		}
	}

	/**
	 * 时间差
	 * @param startDate
	 * @param endDate
	 * @return
	 */
	public static long calculateTimes(Date startDate, Date endDate){
		long times = (endDate.getTime()-startDate.getTime());//毫秒
		return times;
	}

	/**
	 * 日期比较
	 * @param startDate
	 * @param endDate
	 * @return 如果startDate >= endDate 返回true 否则返回false
	 */
	public static boolean compareDate(Date startDate, Date endDate) {
		if(startDate==null||startDate==null){
			return false;
		}
		boolean res = startDate.after(endDate);
		return res;
	}
}
